Wiring It All Together: The Application Entry Point

February 2, 2026

Wiring It All Together: The Application Entry Point

We've built all the pieces of our distributed search cluster:

  • TF-IDF algorithm for document scoring
  • HTTP networking layer for communication
  • Search workers and coordinators
  • ZooKeeper-based leader election and service discovery
  • Role transition handling

Now it's time to wire everything together into a complete, production-ready application.

The Challenge

Our main application needs to:

  1. Parse command-line arguments for configuration
  2. Connect to ZooKeeper with proper error handling
  3. Initialize service registries
  4. Set up leader election and role transitions
  5. Handle graceful shutdown on SIGINT/SIGTERM

Let's build it step by step.

Configuration

First, we define our configuration structure and defaults:

const (
defaultPort = 8080
defaultZkAddress = "localhost:2181"
defaultSessionTimeout = 5 * time.Second
defaultDocumentsDir = "resources/books"
)
type Config struct {
Port int
ZkAddress string
SessionTimeout time.Duration
DocumentsDir string
}

Command-Line Parsing

Go's flag package makes argument parsing straightforward:

func parseFlags() *Config {
config := &Config{}
flag.IntVar(&config.Port, "port", defaultPort, "HTTP server port")
flag.StringVar(&config.ZkAddress, "zk", defaultZkAddress, "ZooKeeper address")
flag.DurationVar(&config.SessionTimeout, "timeout", defaultSessionTimeout,
"ZooKeeper session timeout")
flag.StringVar(&config.DocumentsDir, "docs", defaultDocumentsDir,
"Directory containing documents")
help := flag.Bool("help", false, "Show help message")
flag.BoolVar(help, "h", false, "Show help message (shorthand)")
flag.Parse()
if *help {
printUsage()
os.Exit(0)
}
return config
}

This gives us a flexible CLI:

# Start with defaults
./node
# Custom port and ZooKeeper address
./node -port 8081 -zk zookeeper.example.com:2181
# Custom documents directory
./node -docs /path/to/documents

ZooKeeper Connection

Connecting to ZooKeeper requires handling the asynchronous nature of the client:

func connectToZooKeeper(address string, timeout time.Duration) (*zk.Conn, error) {
servers := []string{address}
conn, eventChan, err := zk.Connect(servers, timeout)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
// Handle ZooKeeper events in background
go handleZkEvents(eventChan)
// Wait for connection to be established
for i := 0; i < 10; i++ {
state := conn.State()
if state == zk.StateHasSession {
return conn, nil
}
if state == zk.StateAuthFailed {
conn.Close()
return nil, fmt.Errorf("authentication failed")
}
time.Sleep(500 * time.Millisecond)
}
return conn, nil
}

The event handler logs connection state changes:

func handleZkEvents(eventChan <-chan zk.Event) {
for event := range eventChan {
switch event.State {
case zk.StateConnected:
log.Printf("ZooKeeper: Connected")
case zk.StateHasSession:
log.Printf("ZooKeeper: Session established")
case zk.StateDisconnected:
log.Printf("ZooKeeper: Disconnected (will auto-reconnect)")
case zk.StateExpired:
log.Printf("ZooKeeper: Session expired - node should restart")
case zk.StateAuthFailed:
log.Printf("ZooKeeper: Authentication failed")
}
}
}

The Main Function

Here's where everything comes together:

func main() {
config := parseFlags()
log.Printf("Starting distributed search cluster node")
log.Printf("Configuration: port=%d, zk=%s, timeout=%v, docs=%s",
config.Port, config.ZkAddress, config.SessionTimeout, config.DocumentsDir)
// Step 1: Connect to ZooKeeper
zkConn, err := connectToZooKeeper(config.ZkAddress, config.SessionTimeout)
if err != nil {
log.Fatalf("Failed to connect to ZooKeeper: %v", err)
}
defer zkConn.Close()
// Step 2: Initialize service registries
workersRegistry, err := cluster.NewZkServiceRegistry(
zkConn, cluster.WorkersRegistryPath)
if err != nil {
log.Fatalf("Failed to create workers registry: %v", err)
}
coordinatorsRegistry, err := cluster.NewZkServiceRegistry(
zkConn, cluster.CoordinatorsRegistryPath)
if err != nil {
log.Fatalf("Failed to create coordinators registry: %v", err)
}
// Step 3: Create role transition handler
electionAction := app.NewOnElectionAction(
workersRegistry,
coordinatorsRegistry,
config.Port,
config.DocumentsDir,
)
// Step 4: Set up leader election
leaderElection, err := cluster.NewLeaderElection(zkConn, electionAction)
if err != nil {
log.Fatalf("Failed to create leader election: %v", err)
}
if err := leaderElection.VolunteerForLeadership(); err != nil {
log.Fatalf("Failed to volunteer for leadership: %v", err)
}
// Step 5: Run initial election
if err := leaderElection.ReelectLeader(); err != nil {
log.Fatalf("Failed to run initial election: %v", err)
}
log.Printf("Node started, current role: %s", electionAction.GetCurrentRole())
// Step 6: Wait for shutdown signal
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
sig := <-shutdown
log.Printf("Received signal %v, initiating graceful shutdown...", sig)
// Step 7: Graceful shutdown
if err := electionAction.Stop(); err != nil {
log.Printf("Error during shutdown: %v", err)
}
log.Printf("Node shutdown complete")
}

Signal Handling

Proper signal handling is crucial for production systems:

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
sig := <-shutdown

This catches:

  • SIGINT (Ctrl+C)
  • SIGTERM (sent by process managers like systemd or Kubernetes)

When a signal is received, we:

  1. Stop the HTTP server gracefully
  2. Unregister from service registries
  3. Close the ZooKeeper connection

The Startup Sequence

Here's what happens when a node starts:

┌─────────────────────────────────────────────────────────────┐
│ Node Startup │
├─────────────────────────────────────────────────────────────┤
│ 1. Parse command-line arguments │
│ └─ port=8080, zk=localhost:2181, docs=resources/books │
│ │
│ 2. Connect to ZooKeeper │
│ └─ Wait for session establishment │
│ │
│ 3. Initialize registries │
│ ├─ /workers_service_registry │
│ └─ /coordinators_service_registry │
│ │
│ 4. Create OnElectionAction │
│ └─ Handles role transitions │
│ │
│ 5. Create LeaderElection │
│ └─ Creates /election znode if needed │
│ │
│ 6. Volunteer for leadership │
│ └─ Creates ephemeral sequential znode /election/c_XXXXX │
│ │
│ 7. Run election │
│ ├─ If smallest → OnElectedToBeLeader() │
│ └─ Otherwise → OnWorker() + watch predecessor │
│ │
│ 8. Block waiting for shutdown signal │
└─────────────────────────────────────────────────────────────┘

Testing the Application

We test the configuration parsing in isolation:

func TestParseFlags_Defaults(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
os.Args = []string{"node"}
config := parseFlags()
if config.Port != defaultPort {
t.Errorf("expected port %d, got %d", defaultPort, config.Port)
}
if config.ZkAddress != defaultZkAddress {
t.Errorf("expected zk %s, got %s", defaultZkAddress, config.ZkAddress)
}
}
func TestParseFlags_CustomValues(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
os.Args = []string{
"node",
"-port", "9090",
"-zk", "zk.example.com:2181",
"-timeout", "10s",
"-docs", "/custom/docs",
}
config := parseFlags()
if config.Port != 9090 {
t.Errorf("expected port 9090, got %d", config.Port)
}
if config.SessionTimeout != 10*time.Second {
t.Errorf("expected timeout 10s, got %v", config.SessionTimeout)
}
}

Running the Cluster

To run a 3-node cluster locally:

# Terminal 1: Start ZooKeeper
docker run -p 2181:2181 zookeeper
# Terminal 2: Start first node
./node -port 8080
# Terminal 3: Start second node
./node -port 8081
# Terminal 4: Start third node
./node -port 8082

You'll see output like:

# Node 1 (becomes leader)
Starting distributed search cluster node
Configuration: port=8080, zk=localhost:2181, timeout=5s, docs=resources/books
Successfully connected to ZooKeeper at localhost:2181
Initialized service registries
Volunteered for leadership with znode: c_0000000001
Elected as leader with znode: c_0000000001
Transitioning to LEADER role on port 8080
Successfully transitioned to LEADER role, serving at http://localhost:8080/search
Node started, current role: leader
# Node 2 (becomes worker)
Starting distributed search cluster node
Configuration: port=8081, zk=localhost:2181, timeout=5s, docs=resources/books
Successfully connected to ZooKeeper at localhost:2181
Initialized service registries
Volunteered for leadership with znode: c_0000000002
Not leader. Watching predecessor: c_0000000001
Transitioning to WORKER role on port 8081
Successfully transitioned to WORKER role
Node started, current role: worker

Graceful Shutdown

When you press Ctrl+C:

^CReceived signal interrupt, initiating graceful shutdown...
Stopping OnElectionAction
Stopping HTTP server on port 8080
Unregistered from cluster: /coordinators_service_registry/n_0000000001
Node shutdown complete

The other nodes will detect the leader's departure and trigger re-election automatically.

Production Considerations

For production deployments, consider:

  1. Health checks: Add a /health endpoint for load balancers
  2. Metrics: Export Prometheus metrics for monitoring
  3. Logging: Use structured logging (e.g., zerolog, zap)
  4. Configuration: Support environment variables and config files
  5. Containerization: Create a Dockerfile for easy deployment

What's Next?

In the final post, we'll run the complete test suite and verify all 12 correctness properties pass.

Get the Code

git clone git@github.com:UnplugCharger/distributed_doc_search.git
git checkout 07-application-entry-point
cd distributed-search-cluster-go
go build ./cmd/node
./node -help

This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.